/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.engine.meta.crawler

import com.chinamobile.cmss.lakehouse.engine.meta.crawler.Generator.{buildTree, resolveMaybeTable}
import com.chinamobile.cmss.lakehouse.engine.meta.crawler.model.{MayBeTable, Node}
import com.typesafe.config.Config
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.slf4j.LoggerFactory

case class Generator(jobConfig: Config) {
  private val logger = LoggerFactory.getLogger(getClass)

  def run(): List[MayBeTable] = {
    val holder = FileSystemHolder(jobConfig)
    logger.info("scan path: {}", holder.scanPath)
    val rootNode = buildTree(holder.fs, holder.scanPath)
    resolveMaybeTable(rootNode);
  }
}

object Generator {
  private val logger = LoggerFactory.getLogger(getClass)

  def buildTree(fs: FileSystem, path: Path): Node = {
    val name = path.getName
    val statuses = fs.listStatus(path)
    var files = List[FileStatus]()
    var dir = List[FileStatus]()
    statuses.foreach(f => {
      if (f.isDirectory) {
        dir = f :: dir
      } else {
        files = f :: files
      }
    })
    val children = dir.map(fileStatus => buildTree(fs, fileStatus.getPath))
    val hasSameFiles = {
      if (files.isEmpty) false
      else {
        val fileName = files.head.getPath.getName
        if (fileName.endsWith(".")) {
          false
        } else {
          val suffix = fileName.substring(fileName.lastIndexOf(".") + 1)
          files.tail
            .map(_.getPath.getName)
            .map(n => n.substring(n.lastIndexOf(".") + 1))
            .forall(_ == suffix)
        }
      }
    }
    Node(name, path.toString, children, hasSameFiles)
  }

  def resolveMaybeTable(node: Node): List[MayBeTable] = {
    def resolveInternal(node: Node): List[(Int, MayBeTable)] =
      node match {
        case Node(name, path, Nil, true) if StringUtils.isNotBlank(name) =>
          val mayBeTable = MayBeTable(name, path, List())
          logger.info("find candidate {}", mayBeTable)
          List((1, mayBeTable))
        case Node(name, path, children, _) =>
          val resolved = children
            .filter(m => m.children.nonEmpty || m.hasSameFiles)
            .map(resolveInternal(_))
          if (
            resolved.nonEmpty && resolved.forall(_.size == 1) && StringUtils
              .isNotBlank(name)
          ) {
            val t = resolved.map(_.head)
            val depth = t.head._1
            if (resolved.size > 1 && t.tail.map(_._1).forall(_ == depth)) {
              //merge tables
              logger.info("merge tables {}", t)
              val merged = t
                .map(_._2)
                .foldRight(
                  MayBeTable(tableName = name, tablePath = path, List())
                )((t1, acc) => {
                  t1 match {
                    case MayBeTable(_, tablePath, Nil) =>
                      acc.copy(partitionPaths = tablePath :: acc.partitionPaths)
                    case MayBeTable(_, _, st) =>
                      acc.copy(partitionPaths = st ::: acc.partitionPaths)
                  }
                })
              List((depth, merged))
            } else {
              resolved.flatMap(identity).map { case (t, m) => (t + 1, m) }
            }
          } else {
            resolved.flatMap(identity).map { case (t, m) => (t + 1, m) }
          }
      }

    resolveInternal(node).map(_._2)
  }
}
